/*-------------------------------------------------------------------------
 *
 * hash_xlog.c
 *      WAL replay logic for hash index.
 *
 *
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *      src/backend/access/hash/hash_xlog.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/heapam_xlog.h"
#include "access/bufmask.h"
#include "access/hash.h"
#include "access/hash_xlog.h"
#include "access/xlogutils.h"
#include "access/xlog.h"
#include "access/transam.h"
#include "storage/procarray.h"
#include "miscadmin.h"

/*
 * replay a hash index meta page
 */
static void
hash_xlog_init_meta_page(XLogReaderState *record)
{
    XLogRecPtr    lsn = record->EndRecPtr;
    Page        page;
    Buffer        metabuf;
    ForkNumber    forknum;

    xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) XLogRecGetData(record);

    /* create the index' metapage */
    metabuf = XLogInitBufferForRedo(record, 0);
    Assert(BufferIsValid(metabuf));
    _hash_init_metabuffer(metabuf, xlrec->num_tuples, xlrec->procid,
                          xlrec->ffactor, true);
    page = (Page) BufferGetPage(metabuf);
    PageSetLSN(page, lsn);
    MarkBufferDirty(metabuf);

    /*
     * Force the on-disk state of init forks to always be in sync with the
     * state in shared buffers.  See XLogReadBufferForRedoExtended.  We need
     * special handling for init forks as create index operations don't log a
     * full page image of the metapage.
     */
    XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
    if (forknum == INIT_FORKNUM)
        FlushOneBuffer(metabuf);

    /* all done */
    UnlockReleaseBuffer(metabuf);
}

/*
 * replay a hash index bitmap page
 */
static void
hash_xlog_init_bitmap_page(XLogReaderState *record)
{
    XLogRecPtr    lsn = record->EndRecPtr;
    Buffer        bitmapbuf;
    Buffer        metabuf;
    Page        page;
    HashMetaPage metap;
    uint32        num_buckets;
    ForkNumber    forknum;

    xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) XLogRecGetData(record);

    /*
     * Initialize bitmap page
     */
    bitmapbuf = XLogInitBufferForRedo(record, 0);
    _hash_initbitmapbuffer(bitmapbuf, xlrec->bmsize, true);
    PageSetLSN(BufferGetPage(bitmapbuf), lsn);
    MarkBufferDirty(bitmapbuf);

    /*
     * Force the on-disk state of init forks to always be in sync with the
     * state in shared buffers.  See XLogReadBufferForRedoExtended.  We need
     * special handling for init forks as create index operations don't log a
     * full page image of the metapage.
     */
    XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
    if (forknum == INIT_FORKNUM)
        FlushOneBuffer(bitmapbuf);
    UnlockReleaseBuffer(bitmapbuf);

    /* add the new bitmap page to the metapage's list of bitmaps */
    if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
    {
        /*
         * Note: in normal operation, we'd update the metapage while still
         * holding lock on the bitmap page.  But during replay it's not
         * necessary to hold that lock, since nobody can see it yet; the
         * creating transaction hasn't yet committed.
         */
        page = BufferGetPage(metabuf);
        metap = HashPageGetMeta(page);

        num_buckets = metap->hashm_maxbucket + 1;
        metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
        metap->hashm_nmaps++;

        PageSetLSN(page, lsn);
        MarkBufferDirty(metabuf);

        XLogRecGetBlockTag(record, 1, NULL, &forknum, NULL);
        if (forknum == INIT_FORKNUM)
            FlushOneBuffer(metabuf);
    }
    if (BufferIsValid(metabuf))
        UnlockReleaseBuffer(metabuf);
}

/*
 * replay a hash index insert without split
 */
static void
hash_xlog_insert(XLogReaderState *record)
{
    HashMetaPage metap;
    XLogRecPtr    lsn = record->EndRecPtr;
    xl_hash_insert *xlrec = (xl_hash_insert *) XLogRecGetData(record);
    Buffer        buffer;
    Page        page;

    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
    {
        Size        datalen;
        char       *datapos = XLogRecGetBlockData(record, 0, &datalen);

        page = BufferGetPage(buffer);

        if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
                        false, false) == InvalidOffsetNumber)
            elog(PANIC, "hash_xlog_insert: failed to add item");

        PageSetLSN(page, lsn);
        MarkBufferDirty(buffer);
    }
    if (BufferIsValid(buffer))
        UnlockReleaseBuffer(buffer);

    if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
    {
        /*
         * Note: in normal operation, we'd update the metapage while still
         * holding lock on the page we inserted into.  But during replay it's
         * not necessary to hold that lock, since no other index updates can
         * be happening concurrently.
         */
        page = BufferGetPage(buffer);
        metap = HashPageGetMeta(page);
        metap->hashm_ntuples += 1;

        PageSetLSN(page, lsn);
        MarkBufferDirty(buffer);
    }
    if (BufferIsValid(buffer))
        UnlockReleaseBuffer(buffer);
}

/*
 * replay addition of overflow page for hash index
 */
static void
hash_xlog_add_ovfl_page(XLogReaderState *record)
{// #lizard forgives
    XLogRecPtr    lsn = record->EndRecPtr;
    xl_hash_add_ovfl_page *xlrec = (xl_hash_add_ovfl_page *) XLogRecGetData(record);
    Buffer        leftbuf;
    Buffer        ovflbuf;
    Buffer        metabuf;
    BlockNumber leftblk;
    BlockNumber rightblk;
    BlockNumber newmapblk = InvalidBlockNumber;
    Page        ovflpage;
    HashPageOpaque ovflopaque;
    uint32       *num_bucket;
    char       *data;
    Size        datalen PG_USED_FOR_ASSERTS_ONLY;
    bool        new_bmpage = false;

    XLogRecGetBlockTag(record, 0, NULL, NULL, &rightblk);
    XLogRecGetBlockTag(record, 1, NULL, NULL, &leftblk);

    ovflbuf = XLogInitBufferForRedo(record, 0);
    Assert(BufferIsValid(ovflbuf));

    data = XLogRecGetBlockData(record, 0, &datalen);
    num_bucket = (uint32 *) data;
    Assert(datalen == sizeof(uint32));
    _hash_initbuf(ovflbuf, InvalidBlockNumber, *num_bucket, LH_OVERFLOW_PAGE,
                  true);
    /* update backlink */
    ovflpage = BufferGetPage(ovflbuf);
    ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
    ovflopaque->hasho_prevblkno = leftblk;

    PageSetLSN(ovflpage, lsn);
    MarkBufferDirty(ovflbuf);

    if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
    {
        Page        leftpage;
        HashPageOpaque leftopaque;

        leftpage = BufferGetPage(leftbuf);
        leftopaque = (HashPageOpaque) PageGetSpecialPointer(leftpage);
        leftopaque->hasho_nextblkno = rightblk;

        PageSetLSN(leftpage, lsn);
        MarkBufferDirty(leftbuf);
    }

    if (BufferIsValid(leftbuf))
        UnlockReleaseBuffer(leftbuf);
    UnlockReleaseBuffer(ovflbuf);

    /*
     * Note: in normal operation, we'd update the bitmap and meta page while
     * still holding lock on the overflow pages.  But during replay it's not
     * necessary to hold those locks, since no other index updates can be
     * happening concurrently.
     */
    if (XLogRecHasBlockRef(record, 2))
    {
        Buffer        mapbuffer;

        if (XLogReadBufferForRedo(record, 2, &mapbuffer) == BLK_NEEDS_REDO)
        {
            Page        mappage = (Page) BufferGetPage(mapbuffer);
            uint32       *freep = NULL;
            char       *data;
            uint32       *bitmap_page_bit;

            freep = HashPageGetBitmap(mappage);

            data = XLogRecGetBlockData(record, 2, &datalen);
            bitmap_page_bit = (uint32 *) data;

            SETBIT(freep, *bitmap_page_bit);

            PageSetLSN(mappage, lsn);
            MarkBufferDirty(mapbuffer);
        }
        if (BufferIsValid(mapbuffer))
            UnlockReleaseBuffer(mapbuffer);
    }

    if (XLogRecHasBlockRef(record, 3))
    {
        Buffer        newmapbuf;

        newmapbuf = XLogInitBufferForRedo(record, 3);

        _hash_initbitmapbuffer(newmapbuf, xlrec->bmsize, true);

        new_bmpage = true;
        newmapblk = BufferGetBlockNumber(newmapbuf);

        MarkBufferDirty(newmapbuf);
        PageSetLSN(BufferGetPage(newmapbuf), lsn);

        UnlockReleaseBuffer(newmapbuf);
    }

    if (XLogReadBufferForRedo(record, 4, &metabuf) == BLK_NEEDS_REDO)
    {
        HashMetaPage metap;
        Page        page;
        uint32       *firstfree_ovflpage;

        data = XLogRecGetBlockData(record, 4, &datalen);
        firstfree_ovflpage = (uint32 *) data;

        page = BufferGetPage(metabuf);
        metap = HashPageGetMeta(page);
        metap->hashm_firstfree = *firstfree_ovflpage;

        if (!xlrec->bmpage_found)
        {
            metap->hashm_spares[metap->hashm_ovflpoint]++;

            if (new_bmpage)
            {
                Assert(BlockNumberIsValid(newmapblk));

                metap->hashm_mapp[metap->hashm_nmaps] = newmapblk;
                metap->hashm_nmaps++;
                metap->hashm_spares[metap->hashm_ovflpoint]++;
            }
        }

        PageSetLSN(page, lsn);
        MarkBufferDirty(metabuf);
    }
    if (BufferIsValid(metabuf))
        UnlockReleaseBuffer(metabuf);
}

/*
 * replay allocation of page for split operation
 */
static void
hash_xlog_split_allocate_page(XLogReaderState *record)
{// #lizard forgives
    XLogRecPtr    lsn = record->EndRecPtr;
    xl_hash_split_allocate_page *xlrec = (xl_hash_split_allocate_page *) XLogRecGetData(record);
    Buffer        oldbuf;
    Buffer        newbuf;
    Buffer        metabuf;
    Size        datalen PG_USED_FOR_ASSERTS_ONLY;
    char       *data;
    XLogRedoAction action;

    /*
     * To be consistent with normal operation, here we take cleanup locks on
     * both the old and new buckets even though there can't be any concurrent
     * inserts.
     */

    /* replay the record for old bucket */
    action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &oldbuf);

    /*
     * Note that we still update the page even if it was restored from a full
     * page image, because the special space is not included in the image.
     */
    if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
    {
        Page        oldpage;
        HashPageOpaque oldopaque;

        oldpage = BufferGetPage(oldbuf);
        oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);

        oldopaque->hasho_flag = xlrec->old_bucket_flag;
        oldopaque->hasho_prevblkno = xlrec->new_bucket;

        PageSetLSN(oldpage, lsn);
        MarkBufferDirty(oldbuf);
    }

    /* replay the record for new bucket */
    newbuf = XLogInitBufferForRedo(record, 1);
    _hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket,
                  xlrec->new_bucket_flag, true);
    if (!IsBufferCleanupOK(newbuf))
        elog(PANIC, "hash_xlog_split_allocate_page: failed to acquire cleanup lock");
    MarkBufferDirty(newbuf);
    PageSetLSN(BufferGetPage(newbuf), lsn);

    /*
     * We can release the lock on old bucket early as well but doing here to
     * consistent with normal operation.
     */
    if (BufferIsValid(oldbuf))
        UnlockReleaseBuffer(oldbuf);
    if (BufferIsValid(newbuf))
        UnlockReleaseBuffer(newbuf);

    /*
     * Note: in normal operation, we'd update the meta page while still
     * holding lock on the old and new bucket pages.  But during replay it's
     * not necessary to hold those locks, since no other bucket splits can be
     * happening concurrently.
     */

    /* replay the record for metapage changes */
    if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
    {
        Page        page;
        HashMetaPage metap;

        page = BufferGetPage(metabuf);
        metap = HashPageGetMeta(page);
        metap->hashm_maxbucket = xlrec->new_bucket;

        data = XLogRecGetBlockData(record, 2, &datalen);

        if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS)
        {
            uint32        lowmask;
            uint32       *highmask;

            /* extract low and high masks. */
            memcpy(&lowmask, data, sizeof(uint32));
            highmask = (uint32 *) ((char *) data + sizeof(uint32));

            /* update metapage */
            metap->hashm_lowmask = lowmask;
            metap->hashm_highmask = *highmask;

            data += sizeof(uint32) * 2;
        }

        if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT)
        {
            uint32        ovflpoint;
            uint32       *ovflpages;

            /* extract information of overflow pages. */
            memcpy(&ovflpoint, data, sizeof(uint32));
            ovflpages = (uint32 *) ((char *) data + sizeof(uint32));

            /* update metapage */
            metap->hashm_spares[ovflpoint] = *ovflpages;
            metap->hashm_ovflpoint = ovflpoint;
        }

        MarkBufferDirty(metabuf);
        PageSetLSN(BufferGetPage(metabuf), lsn);
    }

    if (BufferIsValid(metabuf))
        UnlockReleaseBuffer(metabuf);
}

/*
 * replay of split operation
 */
static void
hash_xlog_split_page(XLogReaderState *record)
{
    Buffer        buf;

    if (XLogReadBufferForRedo(record, 0, &buf) != BLK_RESTORED)
        elog(ERROR, "Hash split record did not contain a full-page image");

    UnlockReleaseBuffer(buf);
}

/*
 * replay completion of split operation
 */
static void
hash_xlog_split_complete(XLogReaderState *record)
{
    XLogRecPtr    lsn = record->EndRecPtr;
    xl_hash_split_complete *xlrec = (xl_hash_split_complete *) XLogRecGetData(record);
    Buffer        oldbuf;
    Buffer        newbuf;
    XLogRedoAction action;

    /* replay the record for old bucket */
    action = XLogReadBufferForRedo(record, 0, &oldbuf);

    /*
     * Note that we still update the page even if it was restored from a full
     * page image, because the bucket flag is not included in the image.
     */
    if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
    {
        Page        oldpage;
        HashPageOpaque oldopaque;

        oldpage = BufferGetPage(oldbuf);
        oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);

        oldopaque->hasho_flag = xlrec->old_bucket_flag;

        PageSetLSN(oldpage, lsn);
        MarkBufferDirty(oldbuf);
    }
    if (BufferIsValid(oldbuf))
        UnlockReleaseBuffer(oldbuf);

    /* replay the record for new bucket */
    action = XLogReadBufferForRedo(record, 1, &newbuf);

    /*
     * Note that we still update the page even if it was restored from a full
     * page image, because the bucket flag is not included in the image.
     */
    if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
    {
        Page        newpage;
        HashPageOpaque nopaque;

        newpage = BufferGetPage(newbuf);
        nopaque = (HashPageOpaque) PageGetSpecialPointer(newpage);

        nopaque->hasho_flag = xlrec->new_bucket_flag;

        PageSetLSN(newpage, lsn);
        MarkBufferDirty(newbuf);
    }
    if (BufferIsValid(newbuf))
        UnlockReleaseBuffer(newbuf);
}

/*
 * replay move of page contents for squeeze operation of hash index
 */
static void
hash_xlog_move_page_contents(XLogReaderState *record)
{// #lizard forgives
    XLogRecPtr    lsn = record->EndRecPtr;
    xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record);
    Buffer        bucketbuf = InvalidBuffer;
    Buffer        writebuf = InvalidBuffer;
    Buffer        deletebuf = InvalidBuffer;
    XLogRedoAction action;

    /*
     * Ensure we have a cleanup lock on primary bucket page before we start
     * with the actual replay operation.  This is to ensure that neither a
     * scan can start nor a scan can be already-in-progress during the replay
     * of this operation.  If we allow scans during this operation, then they
     * can miss some records or show the same record multiple times.
     */
    if (xldata->is_prim_bucket_same_wrt)
        action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
    else
    {
        /*
         * we don't care for return value as the purpose of reading bucketbuf
         * is to ensure a cleanup lock on primary bucket page.
         */
        (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);

        action = XLogReadBufferForRedo(record, 1, &writebuf);
    }

    /* replay the record for adding entries in overflow buffer */
    if (action == BLK_NEEDS_REDO)
    {
        Page        writepage;
        char       *begin;
        char       *data;
        Size        datalen;
        uint16        ninserted = 0;

        data = begin = XLogRecGetBlockData(record, 1, &datalen);

        writepage = (Page) BufferGetPage(writebuf);

        if (xldata->ntups > 0)
        {
            OffsetNumber *towrite = (OffsetNumber *) data;

            data += sizeof(OffsetNumber) * xldata->ntups;

            while (data - begin < datalen)
            {
                IndexTuple    itup = (IndexTuple) data;
                Size        itemsz;
                OffsetNumber l;

                itemsz = IndexTupleDSize(*itup);
                itemsz = MAXALIGN(itemsz);

                data += itemsz;

                l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
                if (l == InvalidOffsetNumber)
                    elog(ERROR, "hash_xlog_move_page_contents: failed to add item to hash index page, size %d bytes",
                         (int) itemsz);

                ninserted++;
            }
        }

        /*
         * number of tuples inserted must be same as requested in REDO record.
         */
        Assert(ninserted == xldata->ntups);

        PageSetLSN(writepage, lsn);
        MarkBufferDirty(writebuf);
    }

    /* replay the record for deleting entries from overflow buffer */
    if (XLogReadBufferForRedo(record, 2, &deletebuf) == BLK_NEEDS_REDO)
    {
        Page        page;
        char       *ptr;
        Size        len;

        ptr = XLogRecGetBlockData(record, 2, &len);

        page = (Page) BufferGetPage(deletebuf);

        if (len > 0)
        {
            OffsetNumber *unused;
            OffsetNumber *unend;

            unused = (OffsetNumber *) ptr;
            unend = (OffsetNumber *) ((char *) ptr + len);

            if ((unend - unused) > 0)
                PageIndexMultiDelete(page, unused, unend - unused);
        }

        PageSetLSN(page, lsn);
        MarkBufferDirty(deletebuf);
    }

    /*
     * Replay is complete, now we can release the buffers. We release locks at
     * end of replay operation to ensure that we hold lock on primary bucket
     * page till end of operation.  We can optimize by releasing the lock on
     * write buffer as soon as the operation for same is complete, if it is
     * not same as primary bucket page, but that doesn't seem to be worth
     * complicating the code.
     */
    if (BufferIsValid(deletebuf))
        UnlockReleaseBuffer(deletebuf);

    if (BufferIsValid(writebuf))
        UnlockReleaseBuffer(writebuf);

    if (BufferIsValid(bucketbuf))
        UnlockReleaseBuffer(bucketbuf);
}

/*
 * replay squeeze page operation of hash index
 */
static void
hash_xlog_squeeze_page(XLogReaderState *record)
{// #lizard forgives
    XLogRecPtr    lsn = record->EndRecPtr;
    xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record);
    XLogRedoAction action;
    Buffer        bucketbuf = InvalidBuffer;
    Buffer        writebuf;
    Buffer        ovflbuf;
    Buffer        prevbuf = InvalidBuffer;
    Buffer        mapbuf;

    /*
     * Ensure we have a cleanup lock on primary bucket page before we start
     * with the actual replay operation.  This is to ensure that neither a
     * scan can start nor a scan can be already-in-progress during the replay
     * of this operation.  If we allow scans during this operation, then they
     * can miss some records or show the same record multiple times.
     */
    if (xldata->is_prim_bucket_same_wrt)
    {
        action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
    }
    else
    {
        /*
         * we don't care for return value as the purpose of reading bucketbuf
         * is to ensure a cleanup lock on primary bucket page.
         */
        (void) XLogReadBufferForRedoExtended(record,  0, RBM_NORMAL, true, &bucketbuf);

        action = XLogReadBufferForRedo(record,  1, &writebuf);
    }

    /* replay the record for adding entries in overflow buffer */
    if (action == BLK_NEEDS_REDO)
    {
        Page        writepage;
        uint16        ninserted = 0;
        char       *begin;
        char       *data;
        Size        datalen;

        data = begin = XLogRecGetBlockData(record, 1, &datalen);

        writepage = (Page)BufferGetPage(writebuf);

        if (xldata->ntups > 0)
        {
            OffsetNumber *towrite = (OffsetNumber *) data;

            data += sizeof(OffsetNumber) * (xldata->ntups);

            while (data - begin < datalen)
            {
                IndexTuple    itup = (IndexTuple)data;
                Size    itemsz;
                OffsetNumber l;

                itemsz = IndexTupleDSize(*itup);
                itemsz = MAXALIGN(itemsz);

                data += itemsz;

                l = PageAddItem(writepage, (Item)itup, itemsz, towrite[ninserted], false, false);
                if (l == InvalidOffsetNumber)
                    elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes",
                         (int)itemsz);

                ninserted++;
            }
        }

        /*
         * number of tuples inserted must be same as requested in REDO record.
         */
        Assert(ninserted == xldata->ntups);

        /*
         * if the page on which are adding tuples is a page previous to freed
         * overflow page, then update its nextblno.
         */
        if (xldata->is_prev_bucket_same_wrt)
        {
            HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage);

            writeopaque->hasho_nextblkno = xldata->nextblkno;
        }

        PageSetLSN(writepage, lsn);
        MarkBufferDirty(writebuf);
    }

    /* replay the record for initializing overflow buffer */
    if (XLogReadBufferForRedo(record, 2, &ovflbuf) == BLK_NEEDS_REDO)
    {
        Page        ovflpage;
        HashPageOpaque ovflopaque;

        ovflpage = BufferGetPage(ovflbuf);

        _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));

        ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);

        ovflopaque->hasho_prevblkno = InvalidBlockNumber;
        ovflopaque->hasho_nextblkno = InvalidBlockNumber;
        ovflopaque->hasho_bucket = -1;
        ovflopaque->hasho_flag = LH_UNUSED_PAGE;
        ovflopaque->hasho_page_id = HASHO_PAGE_ID;

        PageSetLSN(ovflpage, lsn);
        MarkBufferDirty(ovflbuf);
    }
    if (BufferIsValid(ovflbuf))
        UnlockReleaseBuffer(ovflbuf);

    /* replay the record for page previous to the freed overflow page */
    if (!xldata->is_prev_bucket_same_wrt &&
        XLogReadBufferForRedo(record, 3, &prevbuf) == BLK_NEEDS_REDO)
    {
        Page        prevpage = BufferGetPage(prevbuf);
        HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);

        prevopaque->hasho_nextblkno = xldata->nextblkno;

        PageSetLSN(prevpage, lsn);
        MarkBufferDirty(prevbuf);
    }
    if (BufferIsValid(prevbuf))
        UnlockReleaseBuffer(prevbuf);

    /* replay the record for page next to the freed overflow page */
    if (XLogRecHasBlockRef(record, 4))
    {
        Buffer        nextbuf;

        if (XLogReadBufferForRedo(record, 4, &nextbuf) == BLK_NEEDS_REDO)
        {
            Page        nextpage = BufferGetPage(nextbuf);
            HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);

            nextopaque->hasho_prevblkno = xldata->prevblkno;

            PageSetLSN(nextpage, lsn);
            MarkBufferDirty(nextbuf);
        }
        if (BufferIsValid(nextbuf))
            UnlockReleaseBuffer(nextbuf);
    }

    if (BufferIsValid(writebuf))
        UnlockReleaseBuffer(writebuf);

    if (BufferIsValid(bucketbuf))
        UnlockReleaseBuffer(bucketbuf);

    /*
     * Note: in normal operation, we'd update the bitmap and meta page while
     * still holding lock on the primary bucket page and overflow pages.  But
     * during replay it's not necessary to hold those locks, since no other
     * index updates can be happening concurrently.
     */
    /* replay the record for bitmap page */
    if (XLogReadBufferForRedo(record, 5, &mapbuf) == BLK_NEEDS_REDO)
    {
        Page        mappage = (Page) BufferGetPage(mapbuf);
        uint32       *freep = NULL;
        char       *data;
        uint32       *bitmap_page_bit;
        Size        datalen;

        freep = HashPageGetBitmap(mappage);

        data = XLogRecGetBlockData(record, 5, &datalen);
        bitmap_page_bit = (uint32 *) data;

        CLRBIT(freep, *bitmap_page_bit);

        PageSetLSN(mappage, lsn);
        MarkBufferDirty(mapbuf);
    }
    if (BufferIsValid(mapbuf))
        UnlockReleaseBuffer(mapbuf);

    /* replay the record for meta page */
    if (XLogRecHasBlockRef(record, 6))
    {
        Buffer        metabuf;

        if (XLogReadBufferForRedo(record, 6, &metabuf) == BLK_NEEDS_REDO)
        {
            HashMetaPage metap;
            Page        page;
            char       *data;
            uint32       *firstfree_ovflpage;
            Size        datalen;

            data = XLogRecGetBlockData(record, 6, &datalen);
            firstfree_ovflpage = (uint32 *) data;

            page = BufferGetPage(metabuf);
            metap = HashPageGetMeta(page);
            metap->hashm_firstfree = *firstfree_ovflpage;

            PageSetLSN(page, lsn);
            MarkBufferDirty(metabuf);
        }
        if (BufferIsValid(metabuf))
            UnlockReleaseBuffer(metabuf);
    }
}

/*
 * replay delete operation of hash index
 */
static void
hash_xlog_delete(XLogReaderState *record)
{// #lizard forgives
    XLogRecPtr    lsn = record->EndRecPtr;
    xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record);
    Buffer        bucketbuf = InvalidBuffer;
    Buffer        deletebuf;
    Page        page;
    XLogRedoAction action;

    /*
     * Ensure we have a cleanup lock on primary bucket page before we start
     * with the actual replay operation.  This is to ensure that neither a
     * scan can start nor a scan can be already-in-progress during the replay
     * of this operation.  If we allow scans during this operation, then they
     * can miss some records or show the same record multiple times.
     */
    if (xldata->is_primary_bucket_page)
        action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &deletebuf);
    else
    {
        /*
         * we don't care for return value as the purpose of reading bucketbuf
         * is to ensure a cleanup lock on primary bucket page.
         */
        (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);

        action = XLogReadBufferForRedo(record, 1, &deletebuf);
    }

    /* replay the record for deleting entries in bucket page */
    if (action == BLK_NEEDS_REDO)
    {
        char       *ptr;
        Size        len;

        ptr = XLogRecGetBlockData(record, 1, &len);

        page = (Page) BufferGetPage(deletebuf);

        if (len > 0)
        {
            OffsetNumber *unused;
            OffsetNumber *unend;

            unused = (OffsetNumber *) ptr;
            unend = (OffsetNumber *) ((char *) ptr + len);

            if ((unend - unused) > 0)
                PageIndexMultiDelete(page, unused, unend - unused);
        }

        /*
         * Mark the page as not containing any LP_DEAD items only if
         * clear_dead_marking flag is set to true. See comments in
         * hashbucketcleanup() for details.
         */
        if (xldata->clear_dead_marking)
        {
            HashPageOpaque pageopaque;

            pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
            pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
        }

        PageSetLSN(page, lsn);
        MarkBufferDirty(deletebuf);
    }
    if (BufferIsValid(deletebuf))
        UnlockReleaseBuffer(deletebuf);

    if (BufferIsValid(bucketbuf))
        UnlockReleaseBuffer(bucketbuf);
}

/*
 * replay split cleanup flag operation for primary bucket page.
 */
static void
hash_xlog_split_cleanup(XLogReaderState *record)
{
    XLogRecPtr    lsn = record->EndRecPtr;
    Buffer        buffer;
    Page        page;

    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
    {
        HashPageOpaque bucket_opaque;

        page = (Page) BufferGetPage(buffer);

        bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
        bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
        PageSetLSN(page, lsn);
        MarkBufferDirty(buffer);
    }
    if (BufferIsValid(buffer))
        UnlockReleaseBuffer(buffer);
}

/*
 * replay for update meta page
 */
static void
hash_xlog_update_meta_page(XLogReaderState *record)
{
    HashMetaPage metap;
    XLogRecPtr    lsn = record->EndRecPtr;
    xl_hash_update_meta_page *xldata = (xl_hash_update_meta_page *) XLogRecGetData(record);
    Buffer        metabuf;
    Page        page;

    if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
    {
        page = BufferGetPage(metabuf);
        metap = HashPageGetMeta(page);

        metap->hashm_ntuples = xldata->ntuples;

        PageSetLSN(page, lsn);
        MarkBufferDirty(metabuf);
    }
    if (BufferIsValid(metabuf))
        UnlockReleaseBuffer(metabuf);
}

/*
 * Get the latestRemovedXid from the heap pages pointed at by the index
 * tuples being deleted. See also btree_xlog_delete_get_latestRemovedXid,
 * on which this function is based.
 */
static TransactionId
hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record)
{// #lizard forgives
    xl_hash_vacuum_one_page *xlrec;
    OffsetNumber *unused;
    Buffer        ibuffer,
                hbuffer;
    Page        ipage,
                hpage;
    RelFileNode rnode;
    BlockNumber blkno;
    ItemId        iitemid,
                hitemid;
    IndexTuple    itup;
    HeapTupleHeader htuphdr;
    BlockNumber hblkno;
    OffsetNumber hoffnum;
    TransactionId latestRemovedXid = InvalidTransactionId;
    int            i;

    xlrec = (xl_hash_vacuum_one_page *) XLogRecGetData(record);

    /*
     * If there's nothing running on the standby we don't need to derive a
     * full latestRemovedXid value, so use a fast path out of here.  This
     * returns InvalidTransactionId, and so will conflict with all HS
     * transactions; but since we just worked out that that's zero people,
     * it's OK.
     *
     * XXX There is a race condition here, which is that a new backend might
     * start just after we look.  If so, it cannot need to conflict, but this
     * coding will result in throwing a conflict anyway.
     */
    if (CountDBBackends(InvalidOid) == 0)
        return latestRemovedXid;

    /*
     * Check if WAL replay has reached a consistent database state. If not, we
     * must PANIC. See the definition of
     * btree_xlog_delete_get_latestRemovedXid for more details.
     */
    if (!reachedConsistency)
        elog(PANIC, "hash_xlog_vacuum_get_latestRemovedXid: cannot operate with inconsistent data");

    /*
     * Get index page.  If the DB is consistent, this should not fail, nor
     * should any of the heap page fetches below.  If one does, we return
     * InvalidTransactionId to cancel all HS transactions.  That's probably
     * overkill, but it's safe, and certainly better than panicking here.
     */
    XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
    ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);

    if (!BufferIsValid(ibuffer))
        return InvalidTransactionId;
    LockBuffer(ibuffer, HASH_READ);
    ipage = (Page) BufferGetPage(ibuffer);

    /*
     * Loop through the deleted index items to obtain the TransactionId from
     * the heap items they point to.
     */
    unused = (OffsetNumber *) ((char *) xlrec + SizeOfHashVacuumOnePage);

    for (i = 0; i < xlrec->ntuples; i++)
    {
        /*
         * Identify the index tuple about to be deleted.
         */
        iitemid = PageGetItemId(ipage, unused[i]);
        itup = (IndexTuple) PageGetItem(ipage, iitemid);

        /*
         * Locate the heap page that the index tuple points at
         */
        hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
        hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM,
                                         hblkno, RBM_NORMAL);

        if (!BufferIsValid(hbuffer))
        {
            UnlockReleaseBuffer(ibuffer);
            return InvalidTransactionId;
        }
        LockBuffer(hbuffer, HASH_READ);
        hpage = (Page)BufferGetPage(hbuffer);

        /*
         * Look up the heap tuple header that the index tuple points at by
         * using the heap node supplied with the xlrec. We can't use
         * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
         * Note that we are not looking at tuple data here, just headers.
         */
        hoffnum = ItemPointerGetOffsetNumber( &(itup->t_tid) );
        hitemid = PageGetItemId(hpage,  hoffnum);

        /*
         * Follow any redirections until we find something useful.
         */
        while (ItemIdIsRedirected(hitemid))
        {
            hoffnum = ItemIdGetRedirect(hitemid);
            hitemid = PageGetItemId(hpage,  hoffnum);
            CHECK_FOR_INTERRUPTS();
        }

        /*
         * If the heap item has storage, then read the header and use that to
         * set latestRemovedXid.
         *
         * Some LP_DEAD items may not be accessible, so we ignore them.
         */
        if (ItemIdHasStorage(hitemid))
        {
            htuphdr = (HeapTupleHeader)PageGetItem(hpage, hitemid);
            HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr,  &latestRemovedXid);
        }
        else if (ItemIdIsDead(hitemid))
        {
            /*
             * Conjecture: if hitemid is dead then it had xids before the xids
             * marked on LP_NORMAL items. So we just ignore this item and move
             * onto the next, for the purposes of calculating
             * latestRemovedxids.
             */
        }
        else
        {
            Assert(!ItemIdIsUsed(hitemid));
        }
        UnlockReleaseBuffer(hbuffer);
    }

    UnlockReleaseBuffer(ibuffer);

    /*
     * If all heap tuples were LP_DEAD then we will be returning
     * InvalidTransactionId here, which avoids conflicts. This matches
     * existing logic which assumes that LP_DEAD tuples must already be older
     * than the latestRemovedXid on the cleanup record that set them as
     * LP_DEAD, hence must already have generated a conflict.
     */
    return latestRemovedXid;
}

/*
 * replay delete operation in hash index to remove
 * tuples marked as DEAD during index tuple insertion.
 */
static void
hash_xlog_vacuum_one_page(XLogReaderState *record)
{
    XLogRecPtr    lsn = record->EndRecPtr;
    xl_hash_vacuum_one_page *xldata;
    Buffer        buffer;
    Buffer        metabuf;
    Page        page;
    XLogRedoAction action;
    HashPageOpaque pageopaque;

    xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);

    /*
     * If we have any conflict processing to do, it must happen before we
     * update the page.
     *
     * Hash index records that are marked as LP_DEAD and being removed during
     * hash index tuple insertion can conflict with standby queries. You might
     * think that vacuum records would conflict as well, but we've handled
     * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
     * cleaned by the vacuum of the heap and so we can resolve any conflicts
     * just once when that arrives.  After that we know that no conflicts
     * exist from individual hash index vacuum records on that index.
     */
    if (InHotStandby)
    {
        TransactionId latestRemovedXid =
        hash_xlog_vacuum_get_latestRemovedXid(record);
        RelFileNode rnode;

        XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
        ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
    }

    action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);

    if (action == BLK_NEEDS_REDO)
    {
        page = (Page) BufferGetPage(buffer);

        if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage)
        {
            OffsetNumber *unused;

            unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage);

            PageIndexMultiDelete(page, unused, xldata->ntuples);
        }

        /*
         * Mark the page as not containing any LP_DEAD items. See comments in
         * _hash_vacuum_one_page() for details.
         */
        pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
        pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;

        PageSetLSN(page, lsn);
        MarkBufferDirty(buffer);
    }
    if (BufferIsValid(buffer))
        UnlockReleaseBuffer(buffer);

    if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
    {
        Page        metapage;
        HashMetaPage metap;

        metapage = BufferGetPage(metabuf);
        metap = HashPageGetMeta(metapage);

        metap->hashm_ntuples -= xldata->ntuples;

        PageSetLSN(metapage, lsn);
        MarkBufferDirty(metabuf);
    }
    if (BufferIsValid(metabuf))
        UnlockReleaseBuffer(metabuf);
}

void
hash_redo(XLogReaderState *record)
{// #lizard forgives
    uint8        info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;

    switch (info)
    {
        case XLOG_HASH_INIT_META_PAGE:
            hash_xlog_init_meta_page(record);
            break;
        case XLOG_HASH_INIT_BITMAP_PAGE:
            hash_xlog_init_bitmap_page(record);
            break;
        case XLOG_HASH_INSERT:
            hash_xlog_insert(record);
            break;
        case XLOG_HASH_ADD_OVFL_PAGE:
            hash_xlog_add_ovfl_page(record);
            break;
        case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
            hash_xlog_split_allocate_page(record);
            break;
        case XLOG_HASH_SPLIT_PAGE:
            hash_xlog_split_page(record);
            break;
        case XLOG_HASH_SPLIT_COMPLETE:
            hash_xlog_split_complete(record);
            break;
        case XLOG_HASH_MOVE_PAGE_CONTENTS:
            hash_xlog_move_page_contents(record);
            break;
        case XLOG_HASH_SQUEEZE_PAGE:
            hash_xlog_squeeze_page(record);
            break;
        case XLOG_HASH_DELETE:
            hash_xlog_delete(record);
            break;
        case XLOG_HASH_SPLIT_CLEANUP:
            hash_xlog_split_cleanup(record);
            break;
        case XLOG_HASH_UPDATE_META_PAGE:
            hash_xlog_update_meta_page(record);
            break;
        case XLOG_HASH_VACUUM_ONE_PAGE:
            hash_xlog_vacuum_one_page(record);
            break;
        default:
            elog(PANIC, "hash_redo: unknown op code %u", info);
    }
}

/*
 * Mask a hash page before performing consistency checks on it.
 */
void
hash_mask(char *pagedata, BlockNumber blkno)
{
    Page        page = (Page) pagedata;
    HashPageOpaque opaque;
    int            pagetype;

    mask_page_lsn(page);

    mask_page_hint_bits(page);
    mask_unused_space(page);

    opaque = (HashPageOpaque) PageGetSpecialPointer(page);

    pagetype = opaque->hasho_flag & LH_PAGE_TYPE;
    if (pagetype == LH_UNUSED_PAGE)
    {
        /*
         * Mask everything on a UNUSED page.
         */
        mask_page_content(page);
    }
    else if (pagetype == LH_BUCKET_PAGE ||
             pagetype == LH_OVERFLOW_PAGE)
    {
        /*
         * In hash bucket and overflow pages, it is possible to modify the
         * LP_FLAGS without emitting any WAL record. Hence, mask the line
         * pointer flags. See hashgettuple(), _hash_kill_items() for details.
         */
        mask_lp_flags(page);
    }

    /*
     * It is possible that the hint bit LH_PAGE_HAS_DEAD_TUPLES may remain
     * unlogged. So, mask it. See _hash_kill_items() for details.
     */
    opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
}
